{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(graph-concepts)=\n",
    "# Graph concepts and state machine"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A graph is composed of the following:\n",
    "\n",
    "* Step: A step runs a function or class handler or a REST API call. MLRun comes with a list of [pre-built steps](./available-steps.html) that include data manipulation, readers, writers and model serving. You can also write your own steps using \n",
    "    standard Python functions or custom functions/classes, or can be a external REST API (the special `$remote` class).\n",
    "* Router: A special type of step is a router with routing logic and multiple child routes/models. The basic \n",
    "    routing logic is to route to the child routes based on the `event.path`. More advanced or custom routing can be used,\n",
    "    for example, the ensemble router sends the event to all child routes in parallel, aggregates the result and responds.\n",
    "* Queue: A queue or stream that accepts data from one or more source steps and publishes to one or more output steps. \n",
    "    Queues are best used to connect independent functions/containers. Queues can run in-memory or be implemented using a stream, which allows it to span processes/containers.\n",
    "    \n",
    "The graph server has two modes of operation (topologies):\n",
    "\n",
    "* Router topology (default): A minimal configuration with a single router and child tasks/routes. This can be used for simple model serving or single hop configurations.\n",
    "* Flow topology: A full graph/DAG. The flow topology is implemented using two engines: async (the default)\n",
    "is based on [Storey](https://github.com/mlrun/storey) and asynchronous event loop; and `sync`, which supports a simple\n",
    "sequence of steps.\n",
    "\n",
    "**In this section**\n",
    "- [The event object](#the-event-object)\n",
    "- [The context object](#the-context-object)\n",
    "- [Topology](#topology)\n",
    "- [Building distributed graphs](#building-distributed-graphs)\n",
    "- [Error handling](#error-handling)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The Event object\n",
    "\n",
    "The graph state machine accepts an event object (similar to a Nuclio event) and passes \n",
    "it along the pipeline. An event object hosts the event `body` along with other attributes \n",
    "such as `path` (http request path), `method` (GET, POST, ..), and `id` (unique event ID).\n",
    "\n",
    "In some cases the events represent a record with a unique `key`, which can be read/set \n",
    "through the `event.key`.\n",
    "\n",
    "The task steps are called with the `event.body` by default. If a task step needs to \n",
    "read or set other event elements (key, path, time, ..) you should set the task `full_event`\n",
    "argument to `True`.\n",
    "\n",
    "Task steps support optional `input_path` and `result_path` attributes that allow controlling which portion of \n",
    "the event is sent as input to the step, and where to update the returned result.\n",
    "\n",
    "For example, for an event body `{\"req\": {\"body\": \"x\"}}`, `input_path=\"req.body\"` and `result_path=\"resp\"` \n",
    "the step gets `\"x\"` as the input. The output after the step is `{\"req\": {\"body\": \"x\"}: \"resp\": <step output>}`.\n",
    "Note that `input_path` and `result_path` do not work together with `full_event=True`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The context object\n",
    "\n",
    "The step classes are initialized with a `context` object (when they have `context` in their `__init__` args).\n",
    "The context is used to pass data and for interfacing with system services. The context object has the \n",
    "following attributes and methods.\n",
    "\n",
    "Attributes:\n",
    "* **logger**: Central logger (Nuclio logger when running in Nuclio).\n",
    "* **verbose**: True if in verbose/debug mode.\n",
    "* **root**: The graph object.\n",
    "* **current_function**: When running in a distributed graph, the current child function name.\n",
    "\n",
    "Methods:\n",
    "* **get_param(key, default=None)**: Get the graph parameter by key. Parameters are set at the\n",
    "  serving function (e.g. `function.spec.parameters = {\"param1\": \"x\"}`).\n",
    "* **get_secret(key)**: Get the value of a project/user secret.\n",
    "* **get_store_resource(uri, use_cache=True)**: Get the mlrun store object (data item, artifact, model, feature set, feature vector).\n",
    "* **get_remote_endpoint(name, external=False)**: Return the remote nuclio/serving function http(s) endpoint given its [project/]function-name[:tag].\n",
    "* **Response(headers=None, body=None, content_type=None, status_code=200)**: Create a nuclio response object, for returning detailed http responses.\n",
    "\n",
    "Example, using the context:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if self.context.verbose:\n",
    "    self.context.logger.info(\"my message\", some_arg=\"text\")\n",
    "    x = self.context.get_param(\"x\", 0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Topology"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Router\n",
    "Once you have a serving function, you need to choose the graph topology. The default is `router` topology. With the `router` topology you can specify different machine learning models. Each model has a logical name. This name is used to route to the correct model when calling the serving function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2021-11-02 04:18:36,925 [info] model model1 was loaded\n",
      "> 2021-11-02 04:18:36,926 [info] Initializing endpoint records\n",
      "> 2021-11-02 04:18:36,965 [info] Loaded ['model1']\n",
      "{'id': '6bd11e864805484ea888f58e478d1f91', 'model_name': 'model1', 'outputs': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]}\n"
     ]
    }
   ],
   "source": [
    "from sklearn.datasets import load_iris\n",
    "\n",
    "# set the topology/router\n",
    "graph = fn.set_topology(\"router\")\n",
    "\n",
    "# Add the model\n",
    "fn.add_model(\n",
    "    \"model1\",\n",
    "    class_name=\"ClassifierModel\",\n",
    "    model_path=\"https://s3.wasabisys.com/iguazio/models/iris/model.pkl\",\n",
    ")\n",
    "\n",
    "# Add additional models\n",
    "# fn.add_model(\"model2\", class_name=\"ClassifierModel\", model_path=\"<path2>\")\n",
    "\n",
    "# create and use the graph simulator\n",
    "server = fn.to_mock_server()\n",
    "x = load_iris()[\"data\"].tolist()\n",
    "result = server.test(\"/v2/models/model1/infer\", {\"inputs\": x})\n",
    "\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Flow\n",
    "\n",
    "Using the `flow` topology, you can specify tasks, which typically manipulate the data. The most common scenario is pre-processing of data prior to the model execution.\n",
    "\n",
    "```{note} Once the topology is set, you cannot change an existing function topology.\n",
    "```\n",
    "\n",
    "In this topology, you build and connect the graph (DAG) by adding steps using the `step.to()` method, or by using the \n",
    "`graph.add_step()` method.\n",
    "\n",
    "> The `step.to()` is typically used to chain steps together. `graph.add_step` can add steps anywhere on the\n",
    "> graph and has `before` and `after` parameters to specify the location of the step."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<!-- show example without router -->"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "image/svg+xml": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.40.1 (20161225.0304)\n",
       " -->\n",
       "<!-- Title: mlrun&#45;flow Pages: 1 -->\n",
       "<svg width=\"432pt\" height=\"84pt\"\n",
       " viewBox=\"0.00 0.00 432.42 84.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 80)\">\n",
       "<title>mlrun&#45;flow</title>\n",
       "<polygon fill=\"#ffffff\" stroke=\"transparent\" points=\"-4,4 -4,-80 428.425,-80 428.425,4 -4,4\"/>\n",
       "<g id=\"clust1\" class=\"cluster\">\n",
       "<title>cluster_router</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"215.1919,-8 215.1919,-68 416.425,-68 416.425,-8 215.1919,-8\"/>\n",
       "</g>\n",
       "<!-- _start -->\n",
       "<g id=\"node1\" class=\"node\">\n",
       "<title>_start</title>\n",
       "<polygon fill=\"#d3d3d3\" stroke=\"#000000\" points=\"38.5476,-20.0493 40.698,-20.1479 42.8263,-20.2953 44.9236,-20.4913 46.9815,-20.7353 48.9917,-21.0266 50.9463,-21.3645 52.8377,-21.7479 54.6587,-22.1759 56.4025,-22.6472 58.0628,-23.1606 59.634,-23.7147 61.1107,-24.308 62.4882,-24.9388 63.7625,-25.6054 64.9302,-26.3059 65.9882,-27.0385 66.9343,-27.8012 67.7669,-28.5918 68.4849,-29.4082 69.0878,-30.2481 69.5758,-31.1093 69.9496,-31.9894 70.2102,-32.886 70.3595,-33.7965 70.3997,-34.7186 70.3334,-35.6497 70.1636,-36.5873 69.8937,-37.5287 69.5276,-38.4713 69.0691,-39.4127 68.5225,-40.3503 67.8923,-41.2814 67.1831,-42.2035 66.3996,-43.114 65.5464,-44.0106 64.6285,-44.8907 63.6504,-45.7519 62.617,-46.5918 61.5329,-47.4082 60.4024,-48.1988 59.2299,-48.9615 58.0197,-49.6941 56.7755,-50.3946 55.5012,-51.0612 54.2002,-51.692 52.8757,-52.2853 51.5309,-52.8394 50.1684,-53.3528 48.7908,-53.8241 47.4003,-54.2521 45.9989,-54.6355 44.5886,-54.9734 43.1708,-55.2647 41.7472,-55.5087 40.3189,-55.7047 38.8872,-55.8521 37.4531,-55.9507 36.0175,-56 34.5815,-56 33.146,-55.9507 31.7119,-55.8521 30.2801,-55.7047 28.8519,-55.5087 27.4282,-55.2647 26.0105,-54.9734 24.6001,-54.6355 23.1988,-54.2521 21.8083,-53.8241 20.4306,-53.3528 19.0681,-52.8394 17.7233,-52.2853 16.3989,-51.692 15.0979,-51.0612 13.8236,-50.3946 12.5794,-49.6941 11.3691,-48.9615 10.1967,-48.1988 9.0662,-47.4082 7.982,-46.5918 6.9486,-45.7519 5.9706,-44.8907 5.0526,-44.0106 4.1995,-43.114 3.4159,-42.2035 2.7067,-41.2814 2.0765,-40.3503 1.53,-39.4127 1.0715,-38.4713 .7053,-37.5287 .4355,-36.5873 .2657,-35.6497 .1993,-34.7186 .2395,-33.7965 .3888,-32.886 .6495,-31.9894 1.0232,-31.1093 1.5112,-30.2481 2.1141,-29.4082 2.8321,-28.5918 3.6647,-27.8012 4.6109,-27.0385 5.6689,-26.3059 6.8365,-25.6054 8.1108,-24.9388 9.4884,-24.308 10.9651,-23.7147 12.5362,-23.1606 14.1966,-22.6472 15.9404,-22.1759 17.7614,-21.7479 19.6528,-21.3645 21.6074,-21.0266 23.6176,-20.7353 25.6755,-20.4913 27.7728,-20.2953 29.901,-20.1479 32.0515,-20.0493 34.2154,-20 36.3837,-20 38.5476,-20.0493\"/>\n",
       "<text text-anchor=\"middle\" x=\"35.2995\" y=\"-34.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">start</text>\n",
       "</g>\n",
       "<!-- enrich -->\n",
       "<g id=\"node2\" class=\"node\">\n",
       "<title>enrich</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"146.8955\" cy=\"-38\" rx=\"40.0939\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"146.8955\" y=\"-34.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">enrich</text>\n",
       "</g>\n",
       "<!-- _start&#45;&gt;enrich -->\n",
       "<g id=\"edge1\" class=\"edge\">\n",
       "<title>_start&#45;&gt;enrich</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M69.9975,-38C78.2752,-38 87.2796,-38 96.1148,-38\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"96.3553,-41.5001 106.3553,-38 96.3552,-34.5001 96.3553,-41.5001\"/>\n",
       "</g>\n",
       "<!-- router -->\n",
       "<g id=\"node3\" class=\"node\">\n",
       "<title>router</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"314.532,-30.5442 314.532,-45.4558 288.9193,-56 252.6976,-56 227.0849,-45.4558 227.0849,-30.5442 252.6976,-20 288.9193,-20 314.532,-30.5442\"/>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"318.5418,-27.8677 318.5418,-48.1323 289.7142,-60 251.9027,-60 223.0751,-48.1323 223.0751,-27.8677 251.9027,-16 289.7142,-16 318.5418,-27.8677\"/>\n",
       "<text text-anchor=\"middle\" x=\"270.8084\" y=\"-34.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">router</text>\n",
       "</g>\n",
       "<!-- enrich&#45;&gt;router -->\n",
       "<g id=\"edge3\" class=\"edge\">\n",
       "<title>enrich&#45;&gt;router</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M187.4692,-38C195.498,-38 204.0728,-38 212.5547,-38\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"212.8118,-41.5001 222.8118,-38 212.8118,-34.5001 212.8118,-41.5001\"/>\n",
       "</g>\n",
       "<!-- router/m1 -->\n",
       "<g id=\"node4\" class=\"node\">\n",
       "<title>router/m1</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"381.425\" cy=\"-38\" rx=\"27\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"381.425\" y=\"-34.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">m1</text>\n",
       "</g>\n",
       "<!-- router&#45;&gt;router/m1 -->\n",
       "<g id=\"edge2\" class=\"edge\">\n",
       "<title>router&#45;&gt;router/m1</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M318.6831,-38C327.1277,-38 335.8077,-38 343.9164,-38\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"344.1792,-41.5001 354.1791,-38 344.1791,-34.5001 344.1792,-41.5001\"/>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<graphviz.dot.Digraph at 0x7fd46e4dda50>"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fn2 = mlrun.code_to_function(\n",
    "    \"serving_example_flow\", kind=\"serving\", image=\"mlrun/mlrun\"\n",
    ")\n",
    "\n",
    "graph2 = fn2.set_topology(\"flow\")\n",
    "\n",
    "graph2_enrich = graph2.to(\"storey.Extend\", name=\"enrich\", _fn='({\"tag\": \"something\"})')\n",
    "\n",
    "# add an Ensemble router with two child models (routes)\n",
    "router = graph2.add_step(mlrun.serving.ModelRouter(), name=\"router\", after=\"enrich\")\n",
    "router.add_route(\n",
    "    \"m1\",\n",
    "    class_name=\"ClassifierModel\",\n",
    "    model_path=\"https://s3.wasabisys.com/iguazio/models/iris/model.pkl\",\n",
    ")\n",
    "router.respond()\n",
    "\n",
    "# Add additional models\n",
    "# router.add_route(\"m2\", class_name=\"ClassifierModel\", model_path=path2)\n",
    "\n",
    "# plot the graph (using Graphviz)\n",
    "graph2.plot(rankdir=\"LR\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2021-11-02 04:18:42,142 [info] model m1 was loaded\n",
      "> 2021-11-02 04:18:42,142 [info] Initializing endpoint records\n",
      "> 2021-11-02 04:18:42,183 [info] Loaded ['m1']\n",
      "{'id': 'f713fd7eedeb431eba101b13c53a15b5'}\n"
     ]
    }
   ],
   "source": [
    "fn2_server = fn2.to_mock_server()\n",
    "\n",
    "result = fn2_server.test(\"/v2/models/m1/infer\", {\"inputs\": x})\n",
    "\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Building distributed graphs\n",
    "\n",
    "Graphs can be hosted by a single function (using zero to n containers), or span multiple functions\n",
    "where each function can have its own container image and resources (replicas, GPUs/CPUs, volumes, etc.).\n",
    "It has a `root` function, which is where you configure triggers (http, incoming stream, cron, ..), \n",
    "and optional downstream child functions.\n",
    "\n",
    "You can specify the `function` attribute in `task` or `router` steps. This indicates where \n",
    "this step should run. When the `function` attribute is not specified it runs on the root function.</b>\n",
    "`function=\"*\"` means the step can run in any of the child functions.\n",
    "\n",
    "Steps on different functions should be connected using a `queue` step (a stream).\n",
    "\n",
    "**Adding a child function:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fn.add_child_function(\n",
    "    \"enrich\",\n",
    "    \"./entity_extraction.ipynb\",\n",
    "    image=\"mlrun/mlrun\",\n",
    "    requirements=[\"storey\", \"sklearn\"],\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "See a [full example with child functions](./model-serving-get-started.html#example-nlp-processing-pipeline-with-real-time-streaming).  \n",
    "\n",
    "A distributed graph looks like this:\n",
    "\n",
    "![distributed graph](../_static/images/graph-distributed.png)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  },
  "vscode": {
   "interpreter": {
    "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
